In [1]:
%run startup.py
In [2]:
%%javascript
$.getScript('./assets/js/ipython_notebook_toc.js')
source: http://reactivex.io/documentation/operators.html#tree.
(transcribed to RxPY 1.5.7, Py2.7 / 2016-12, Gunther Klessinger, axiros)
This tree can help you find the ReactiveX Observable operator you’re looking for.
There are no configured behind the scenes imports or code except startup.py
, which defines output helper functions, mainly:
rst, reset_start_time
: resets a global timer, in order to have use cases starting from 0.subs(observable)
: subscribes to an observable, printing notifications with time, thread, valueAll other code is explicitly given in the notebook.
Since all initialisiation of tools is in the first cell, you always have to run the first cell after ipython kernel restarts.
All other cells are autonmous.
In the use case functions, in contrast to the official examples we simply use rand
quite often (mapped to randint(0, 100)
), to demonstrate when/how often observable sequences are generated and when their result is buffered for various subscribers.
When in doubt then run the cell again, you might have been "lucky" and got the same random.
The (bold printed) operator functions are linked to the official documentation and created roughly analogous to the RxJS examples. The rest of the TOC lines links to anchors within the notebooks.
When the output is not in marble format we display it like so:
new subscription on stream 276507289
3.4 M [next] 1.4: {'answer': 42}
3.5 T1 [cmpl] 1.6: fin
where the lines are syncronously print
ed as they happen. "M" and "T1" would be thread names ("M" is main thread).
For each use case in reset_start_time()
(alias rst
), a global timer is set to 0 and we show the offset to it, in milliseconds & with one decimal value and also the offset to the start of stream subscription. In the example 3.4, 3.5 are millis since global counter reset, while 1.4, 1.6 are offsets to start of subscription.
In [3]:
reset_start_time(O.just)
stream = O.just({'answer': rand()})
disposable = subs(stream)
sleep(0.5)
disposable = subs(stream) # same answer
# all stream ops work, its a real stream:
disposable = subs(stream.map(lambda x: x.get('answer', 0) * 2))
In [4]:
print('There is a little API difference to RxJS, see Remarks:\n')
rst(O.start)
def f():
log('function called')
return rand()
stream = O.start(func=f)
d = subs(stream)
d = subs(stream)
header("Exceptions are handled correctly (an observable should never except):")
def breaking_f():
return 1 / 0
stream = O.start(func=breaking_f)
d = subs(stream)
d = subs(stream)
# startasync: only in python3 and possibly here(?) http://www.tornadoweb.org/en/stable/concurrent.html#tornado.concurrent.Future
#stream = O.start_async(f)
#d = subs(stream)
In [5]:
rst(O.from_iterable)
def f():
log('function called')
return rand()
# aliases: O.from_, O.from_list
# 1.: From a tuple:
stream = O.from_iterable((1,2,rand()))
d = subs(stream)
# d = subs(stream) # same result
# 2. from a generator
gen = (rand() for j in range(3))
stream = O.from_iterable(gen)
d = subs(stream)
In [6]:
rst(O.from_callback)
# in my words: In the on_next of the subscriber you'll have the original arguments,
# potentially objects, e.g. user original http requests.
# i.e. you could merge those with the result stream of a backend call to
# a webservice or db and send the request.response back to the user then.
def g(f, a, b):
f(a, b)
log('called f')
stream = O.from_callback(lambda a, b, f: g(f, a, b))('fu', 'bar')
d = subs(stream.delay(200))
# d = subs(stream.delay(200)) # does NOT work
In [7]:
rst()
# start a stream of 0, 1, 2, .. after 200 ms, with a delay of 100 ms:
stream = O.timer(200, 100).time_interval()\
.map(lambda x: 'val:%s dt:%s' % (x.value, x.interval))\
.take(3)
d = subs(stream, name='observer1')
# intermix directly with another one
d = subs(stream, name='observer2')
In [8]:
rst(O.repeat)
# repeat is over *values*, not function calls. Use generate or create for function calls!
subs(O.repeat({'rand': time.time()}, 3))
header('do while:')
l = []
def condition(x):
l.append(1)
return True if len(l) < 2 else False
stream = O.just(42).do_while(condition)
d = subs(stream)
In [9]:
rx = O.create
rst(rx)
def f(obs):
# this function is called for every observer
obs.on_next(rand())
obs.on_next(rand())
obs.on_completed()
def cleanup():
log('cleaning up...')
return cleanup
stream = O.create(f).delay(200) # the delay causes the cleanup called before the subs gets the vals
d = subs(stream)
d = subs(stream)
sleep(0.5)
rst(title='Exceptions are handled nicely')
l = []
def excepting_f(obs):
for i in range(3):
l.append(1)
obs.on_next('%s %s (observer hash: %s)' % (i, 1. / (3 - len(l)), hash(obs) ))
obs.on_completed()
stream = O.create(excepting_f)
d = subs(stream)
d = subs(stream)
rst(title='Feature or Bug?')
print('(where are the first two values?)')
l = []
def excepting_f(obs):
for i in range(3):
l.append(1)
obs.on_next('%s %s (observer hash: %s)' % (i, 1. / (3 - len(l)), hash(obs) ))
obs.on_completed()
stream = O.create(excepting_f).delay(100)
d = subs(stream)
d = subs(stream)
# I think its an (amazing) feature, preventing to process functions results of later(!) failing functions
In [10]:
rx = O.generate
rst(rx)
"""The basic form of generate takes four parameters:
the first item to emit
a function to test an item to determine whether to emit it (true) or terminate the Observable (false)
a function to generate the next item to test and emit based on the value of the previous item
a function to transform items before emitting them
"""
def generator_based_on_previous(x): return x + 1.1
def doubler(x): return 2 * x
d = subs(rx(0, lambda x: x < 4, generator_based_on_previous, doubler))
In [11]:
rx = O.generate_with_relative_time
rst(rx)
stream = rx(1, lambda x: x < 4, lambda x: x + 1, lambda x: x, lambda t: 100)
d = subs(stream)
In [12]:
rst(O.defer)
# plural! (unique per subscription)
streams = O.defer(lambda: O.just(rand()))
d = subs(streams)
d = subs(streams) # gets other values - created by subscription!
In [13]:
# evaluating a condition at subscription time in order to decide which of two streams to take.
rst(O.if_then)
cond = True
def should_run():
return cond
streams = O.if_then(should_run, O.return_value(43), O.return_value(56))
d = subs(streams)
log('condition will now evaluate falsy:')
cond = False
streams = O.if_then(should_run, O.return_value(43), O.return_value(rand()))
d = subs(streams)
d = subs(streams)
In [14]:
rst(O.range)
d = subs(O.range(0, 3))
(you can .publish()
it to get an easy "hot" observable)
In [5]:
rst(O.interval)
d = subs(O.interval(100).time_interval()\
.map(lambda x, v: '%(interval)s %(value)s' \
% ItemGetter(x)).take(3))
In [16]:
rst(O.empty)
d = subs(O.empty())
In [17]:
rst(O.never)
d = subs(O.never())
In [7]:
rst(O.on_error)
d = subs(O.on_error(ZeroDivisionError))
In [ ]: